#include "config.h"

#include <limits.h>
#include <time.h>
#include <string.h>
#include <sys/epoll.h>
#include <semaphore.h>
#include <pthread.h>
#include <sys/eventfd.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBYLIB

#include "configure.h"
#include "job_dock.h"
#include "timer.h"
#include "sysy_lib.h"
#include "adt.h"
#include "sysutil.h"
#include "ylock.h"
#include "dbg.h"

#define JOB_DOCK_SIZE 8192
#define JOBDOCK_SCAN_INTERVAL   10
#if 0
#define JOBDOCK_MUTEX_LOCK
#endif

#define MAX_QUEUED_LEN (1024 * 10)
#define QUEUE_EXTERN 32

typedef struct {
        sy_spinlock_t lock;
        sy_spinlock_t used;
        int count;
        int total;
        char name[MAX_NAME_LEN];
        job_handler_t *array;
} jobdock_queue_t;

typedef struct {
        uint32_t idx; /*idx in job dock, only set then dock init*/
        uint32_t seq;
} job_id_t;

typedef struct {
        job_t job;
        job_handler_t handler;
        int ret;
        job_id_t id; /*idx in job dock, only set then dock init*/
        sy_spinlock_t used; //be careful, only sy_spin_trylock can use on this lock;

#ifdef JOBDOCK_MUTEX_LOCK
        pthread_mutex_t lock;
#else
        sy_spinlock_t lock;
#endif
        int events;
} dock_entry_t;

typedef struct {
        int size; /*max size of the array*/
        int used; /*current used in the array*/
        uint64_t time_used;
        uint32_t prev_time;
        uint32_t last_use;
        uint32_t seq;
        sy_spinlock_t lock;
        int idx;
        dock_entry_t *array[0];
} job_dock_t;

analysis_t *ana;

extern worker_handler_t jobtracker;
static  worker_handler_t __jobdock_watch;
static job_dock_t *jobdock;

static int __jobdock_worker_queue(const worker_handler_t *handler, job_handler_t *job_handler);

uint32_t job_idx(job_t *job)
{
        return ((dock_entry_t *)job)->id.idx;
}

static int __job_lock(dock_entry_t *ent)
{
#ifdef JOBDOCK_MUTEX_LOCK
        return pthread_mutex_lock(&ent->lock);
#else
        return sy_spin_lock(&ent->lock);
#endif
}

static int __job_trylock(dock_entry_t *ent)
{
#ifdef JOBDOCK_MUTEX_LOCK
        return pthread_mutex_trylock(&ent->lock);
#else
        return sy_spin_trylock(&ent->lock);
#endif
}

static int __job_unlock(dock_entry_t *ent)
{
#ifdef JOBDOCK_MUTEX_LOCK
        return pthread_mutex_unlock(&ent->lock);
#else
        return sy_spin_unlock(&ent->lock);
#endif
}

static void __job_init(dock_entry_t *ent, int idx)
{
        ent->id.idx = idx;
        (void) sy_spin_init(&ent->used);

#ifdef JOBDOCK_MUTEX_LOCK
        pthread_mutexattr_t attr;
        pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
        pthread_mutex_init(&ent->lock, &attr);
#else
        (void) sy_spin_init(&ent->lock);
#endif

        ent->handler.idx = 0;
        ent->handler.jobid = idx;
}

static uint16_t __jobdock_seq()
{
        uint16_t seq;

retry:
        seq = jobdock->seq++;
        if (seq == (uint16_t)-1) {
                DBUG("rollback\n");
                goto retry;
        }

        return seq;
}

static int __jobdock_used(dock_entry_t *ent)
{
        int ret;

        ret = sy_spin_trylock(&ent->used);
        if (ret == 0) {
                (void) sy_spin_unlock(&ent->used);
                return 0;
        } else {
                return 1;
        }
}

static void __job_destroy(job_t *job)
{
        int len;
        int64_t time_used;
        dock_entry_t *ent;
        struct timeval now;

        //YASSERT(strcmp(job->name, "none"));

        DBUG("destroy job[%u] %s\n", job_idx(job), job->name);

        //YASSERT(!job->in_queue);

        ent = (void *)job;

        if (__jobdock_used(ent) == 0) {
                DERROR("job %d un-alloc'ed\n", ent->id.idx);
                return;
        }

        if (job->context && job->context != job->__context__) {
                yfree((void **)&job->context);
        }

        ent->id.seq = 0;
        job->state_machine = 0;
        job->worker = NULL;

        _gettimeofday(&now, NULL);
        time_used = _time_used(&job->timer.create, &now);

        if (time_used > 0) {
                analysis_queue(ana, job->name, "total", time_used);

                if (time_used > INT64_MAX / 2)
                        time_used = INT64_MAX / 10;

                DBUG("used %llu\n", (LLU)time_used);

                if (time_used > 1000 * 1000 * 5) {
                        double used = ((double)time_used / 1000) / 1000;

                        if (used > 30) {
                                DWARN("job %s time used %f\n", job->name, used);
                        } else {
                                DINFO("job %s time used %f\n", job->name, used);
                        }
                }
        } else {
                DERROR("invald time\n");
        }

        sy_spin_lock(&jobdock->lock);

        jobdock->used--;
        jobdock->last_use = now.tv_sec;

        memset(&job->timer.create, 0x0, sizeof(job->timer.create));
        sy_spin_unlock(&jobdock->lock);

        len = strlen(job->name);
        if (len + strlen(".destroy") + 1 < JOB_NAME_LEN)
                strcpy(&(job->name[len]), ".destroy");

        sy_spin_unlock(&ent->used);
        //job->name[0] = '\0';

        return;
}

static int  __jobdock_watcher(void *arg)
{
        int ret, i;
        dock_entry_t *ent;
        time_t now, ctime;
        job_t *job;
        static time_t prev = 0;

        (void )arg;

        if (prev == 0)
                prev = gettime();

        now = gettime();

        if (now - prev >= JOBDOCK_SCAN_INTERVAL) {
                DBUG("jobdock watch\n");

                prev = now;

                for (i = 0; i < jobdock->size; i++) {
                        if (jobdock->array[i] == NULL)
                                continue;

                        ent = jobdock->array[i];

                        job = &ent->job;

                        if (__jobdock_used(ent) && (now - job->timer.create.tv_sec) > 10) {
                                ctime = job->timer.create.tv_sec;
                                if (ctime == 0)
                                        continue;

                                DINFO("job[%d] %s status %u, used %u\n",
                                       ent->id.idx, job->name, job->status,
                                       (int)(now - ctime));
                                YASSERT((now - ctime) < 1000);
                        }
                }
        }

        ret = timer1_settime(&__jobdock_watch, USEC_PER_SEC * 3);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        return 0;
}

inline static int __jobdock_get_empty(job_t **job)
{
        int ret, curno, no;
        dock_entry_t *ent;

        jobdock->idx = (jobdock->idx + 1) % jobdock->size;

        curno = jobdock->idx;
        no = curno;
        while (1) {
                if (jobdock->array[no] == NULL) {
                        ret = sy_spin_lock(&jobdock->lock);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        if (jobdock->array[no] == NULL) {
                                ret = ymalloc((void *)&ent, sizeof(*ent));
                                if (unlikely(ret))
                                        UNIMPLEMENTED(__DUMP__);

                                __job_init(ent, no);

                                jobdock->array[no] = ent;
                        }

                        sy_spin_unlock(&jobdock->lock);
                }

                ent = jobdock->array[no];
                ret = __job_trylock(ent);
                if (ret != 0) {
                        no = (no + 1) % jobdock->size;
                        if (no == curno)  /* not found */
                                break;

                        continue;
                }

                ret = sy_spin_trylock(&ent->used);
                if (ret != 0) {
                        __job_unlock(ent);
                        no = (no + 1) % jobdock->size;
                        if (no == curno)  /* not found */
                                break;

                        continue;
                } else  /* free table */
                        goto found;
        }

        return ENOENT;
found:
        ent = jobdock->array[no];
        sy_spin_lock(&jobdock->lock);

        if (jobdock->used > ((JOB_DOCK_SIZE * 3) / 4)) {
                DWARN("job dock busy %u %u\n", jobdock->used, JOB_DOCK_SIZE);
                usleep(jobdock->used);
        }

        jobdock->used++;
        sy_spin_unlock(&jobdock->lock);
        *job = &ent->job;

        DBUG("used job[%u] %p\n", job_idx(*job), *job);

        return 0;
}

int jobdock_init(net_print_func net_print, int allocate)
{
        int ret, i;
        uint32_t len;
        void *ptr;
        dock_entry_t *ent;

        (void) net_print;

        len = sizeof(dock_entry_t *) * JOB_DOCK_SIZE + sizeof(job_dock_t);

        ret = ymalloc(&ptr, len);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        jobdock = ptr;

        memset(jobdock->array, 0x0, sizeof(dock_entry_t *) * JOB_DOCK_SIZE);
        jobdock->size = JOB_DOCK_SIZE;
        jobdock->idx = 0;
        jobdock->seq = 0;
        jobdock->prev_time = gettime();
        jobdock->time_used = 1;

        if (allocate) {
                for (i = 0; i < jobdock->size; i++) {
                        YASSERT(jobdock->array[i] == NULL);
                        ret = ymalloc((void *)&ent, sizeof(*ent));
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        __job_init(ent, i);
                        jobdock->array[i] = ent;
                }
        }

        YASSERT(jobdock->time_used);

        (void) sy_spin_init(&jobdock->lock);

        ret = timer1_create(&__jobdock_watch, "jobdock_watch", __jobdock_watcher, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = timer1_settime(&__jobdock_watch, USEC_PER_SEC * 10);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = analysis_create(&ana, "job_dock");
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        return 0;
err_ret:
        return ret;
}

int job_create(job_t **_job, worker_handler_t *__jobtracker, const char *name)
{
        int ret, retry = 0;
        job_t *job = NULL;
        dock_entry_t *ent;

retry:
        ret = __jobdock_get_empty(&job);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DWARN("jobdock busy, %s used %u\n", name, jobdock->used);

                        if (retry < MAX_RETRY * 10) {
                                retry++;
                                sleep(1);
                                goto retry;
                        } else
                                UNIMPLEMENTED(__DUMP__);
                } else
                        GOTO(err_ret, ret);
        }

        ent = (void *)job;
        ent->id.seq = __jobdock_seq();
        ent->handler.seq = __jobdock_seq();
        ent->events = 1;
        strncpy(job->name, name, JOB_NAME_LEN);
        job->context = NULL;
        job->worker = __jobtracker;
        job->state_machine = NULL;
        job->context = NULL;
        job->retry = 0;
        job->steps = 0;

        _gettimeofday(&job->timer.create, NULL);
        job->timer.step = job->timer.create;

        __job_unlock(ent);

        //YASSERT(ent->lock == 1);

        *_job = job;

        return 0;
err_ret:
        return ret;
}

int job_context_create(job_t *job, size_t size)
{
        int ret;

        //YASSERT(job->context == NULL);

        if (size <= JOB_CONTEXT_LEN) {
                job->context = job->__context__;
                return 0;
        }

        DWARN("large context %llu, %s\n", (LLU)size, job->name);

        ret = ymalloc(&job->context, size);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

void job_destroy(job_t *job)
{
        __job_destroy(job);
}

void jobdock_iterator()
{
        int i;
        job_t *job;
        time_t now, used;

        now = gettime();

        DINFO("used %u\n", jobdock->used);

        for (i = 0; i < jobdock->size; i++) {
                if (jobdock->array[i] == NULL)
                        continue;

                if (__jobdock_used(jobdock->array[i])) {
                        job = &jobdock->array[i]->job;
                        used = now - job->timer.create.tv_sec;

                        DINFO("%s used time %u\n", job->name,
                              (int)used);
                }
        }
}

uint64_t jobdock_load()
{
        uint64_t load;
        uint32_t now;

        sy_spin_lock(&jobdock->lock);

        now = gettime();

        if (now > jobdock->prev_time) {
                DBUG("diff %u\n", now - jobdock->prev_time);

                jobdock->time_used = jobdock->time_used / (now - jobdock->prev_time + 1);
                jobdock->prev_time = now;
        }

        load = jobdock->time_used;

        DBUG("load %llu\n", (LLU)load);

        sy_spin_unlock(&jobdock->lock);

        return load;
}

int job_get_ret(job_t *job, int idx)
{
        dock_entry_t *ent;

        YASSERT(idx == 0);
        YASSERT((dock_entry_t *)job);

        ent = (dock_entry_t *)job;

        return ent->ret;
}

static int __job_get_lock(const job_handler_t *handler,  dock_entry_t **_ent, const char *msg, int force)
{
        int ret;
        dock_entry_t *ent;
        job_t *job;

        ent = jobdock->array[handler->jobid];
        job = &ent->job;

        if (force) {
                ret = __job_lock(ent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = __job_trylock(ent);
                if (unlikely(ret))
                        goto err_ret;
        }

        if (ent->handler.seq == handler->seq) {
                /*if job timeout, this will be unequal*/
                if (__jobdock_used(ent) == 0) {
                        if (msg) {
                                DBUG("%s: table %d un-alloc'ed, prev name %s\n",
                                     msg, job_idx(job), job->name);
                        }

                        ret = ESTALE;
                        GOTO(err_lock, ret);
                }
        } else {
                if (msg) {
                        DERROR("%s: table %d reused, prev name %s\n",
                               msg, job_idx(job), job->name);
                }

                ret = ESTALE;
                goto err_lock;
        }

        *_ent = ent;

        return 0;
err_lock:        
        __job_unlock(ent);
err_ret:
        return ret;
}

static int __job_resume(job_handler_t _handler, int retval)
{
        int ret;
        dock_entry_t *ent;
        job_t *job;

        ent = jobdock->array[_handler.jobid];
        ent->ret = retval;
        job = &ent->job;

        YASSERT(job->worker);
        ret = __jobdock_worker_queue(job->worker, &_handler);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int job_exec(job_handler_t _handler, int retval, int direct)
{
        (void) direct;
        return __job_resume(_handler, retval);
}

uint64_t job_timermark(job_t *job, const char *stage)
{
        struct timeval now;
        uint64_t time_used;

        if (job == NULL)
                return 0;

        if (gloconf.performance_analysis != 2) {
                return 0;
        }

        _gettimeofday(&now, NULL);
        time_used = _time_used(&job->timer.step, &now);
        job->timer.step = now;
#if 0
        if (used < 1000 * 100)
                // if (used < 100)
                return 0;
#endif

        analysis_queue(ana, job->name, stage, time_used);

        if (time_used > 1000 * 1000 * 5) {
                double used = ((double)time_used / 1000) / 1000;

                if (used > 30) {
                        DERROR("job %s time used %f\n", job->name, used);
                } else {
                        DWARN("job %s time used %f\n", job->name, used);
                }
        }

        return time_used;
}

void jobdock_load_set(uint64_t load)
{
        int ret;

        ret = sy_spin_lock(&jobdock->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        jobdock->prev_time = gettime();

        if (jobdock->time_used < load)
                jobdock->time_used = load;

        sy_spin_unlock(&jobdock->lock);
}

job_handler_t job_handler(job_t *job, int idx)
{
        int loc;
        dock_entry_t *ent;

        YASSERT(idx == 0);

        ent = (dock_entry_t *)job;

        loc = ent->handler.jobid;
        YASSERT(loc < JOB_DOCK_SIZE);
        YASSERT(jobdock->array[loc] == ent);
        //ent->handler[idx].seq = __jobdock_seq();
        return ent->handler;
}

static void __job_exec_func(void *obj)
{
        job_t *job;

        job = obj;

        job_exec(job_handler(job, 0), 0, EXEC_INDIRECT);
}

void job_sleep(job_t *job, suseconds_t usec)
{
        uint64_t tmo;

        (void) job;
        tmo = ytime_gettime();
        tmo += usec;

        timer_insert(job->name , job, __job_exec_func, usec);
}

static int __jobdock_worker_queue_pop(jobdock_queue_t *queue, job_handler_t *array, int max)
{
        int ret, count;

        ret = sy_spin_lock(&queue->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        if (queue->count <= max) {
                memcpy(array, queue->array, sizeof(*array) * queue->count);
                count = queue->count;
                queue->count = 0;
        } else {
                memcpy(array, queue->array, sizeof(*array) * max);
                ARRAY_POP(queue->array[0], max, queue->count);
                count = max;
                queue->count -= max;
        }

        sy_spin_unlock(&queue->lock);

        return count;
}

static int __jobdock_worker_exec(void *ctx)
{
        int ret, count, i, keep;
        job_handler_t array[MAX_QUEUED_LEN];
        job_t *job;
        dock_entry_t *ent;
        jobdock_queue_t *queue;
        char name[MAX_NAME_LEN];
        uint64_t used;
        struct timeval t1, t2;
        char *_name;

        queue = ctx;

        while (1) {
                count = __jobdock_worker_queue_pop(queue, array, 1);
                if (count == 0)
                        break;

                for (i = 0; i < count; i++) {
                        ret = __job_get_lock(&array[i], &ent, "worker_exec", 1);
                        if (unlikely(ret)) {
                                DERROR("job not found %u\n", i);
                                continue;
                        }

                        job = &ent->job;
                        strcpy(name, job->name);

                        //DINFO("exec job %s, idx %u count %u\n", job->name, ent->id.idx, queue->count);

                        _gettimeofday(&t1, NULL);
                        used = _time_used(&job->queue_time, &t1);

                        if (used > 1000 * 1000) {
                                DWARN("job %s time wait %f queue len %u at %s\n",
                                      name, (double)used / 1000 / 1000, queue->count, queue->name);
                        }

                        ANALYSIS_BEGIN(0);
                        YASSERT(job->state_machine);
                        job->state_machine(job, &keep);

                        if (keep == 0) {
                                __job_destroy(job);
                        }

                        _name = name;
                        ANALYSIS_END(0, 1000 * 20, _name);

                        _gettimeofday(&t2, NULL);
                        used = _time_used(&t1, &t2);

                        __job_unlock(ent);
                }
        }

        return 0;
}

static int __jobdock_worker_queue_init(jobdock_queue_t **_queue, const char *name)
{
        int ret;
        jobdock_queue_t *queue;

        ret = ymalloc((void **)&queue, sizeof(*queue));

        memset(queue, 0x0, sizeof(*queue));

        ret = sy_spin_init(&queue->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_spin_init(&queue->used);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        strcpy(queue->name, name);

        *_queue = queue;

        return 0;
err_ret:
        return ret;
}

static int __jobdock_worker_queue_func(void *ctx, const void *arg)
{
        int ret;
        const job_handler_t *handler = arg;
        jobdock_queue_t *queue = ctx;
        job_t *job;
        dock_entry_t *ent;

        ret = sy_spin_lock(&queue->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        if (queue->count + 1 >= queue->total) {
                ret = yrealloc((void **)&queue->array, queue->total * sizeof(job_handler_t),
                               (queue->total + QUEUE_EXTERN) * sizeof(job_handler_t));
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                queue->total += QUEUE_EXTERN;

                if (queue->total > QUEUE_EXTERN) {
                        DINFO("queue %s count %u\n", queue->name, queue->total);
                }
        }

        ret = __job_get_lock(handler, &ent, "job_queue", 1);
        if (unlikely(ret)) {
                DERROR("job[%d] not found\n", handler->idx);
                GOTO(err_lock, ret);
        }

        job = &ent->job;
        job->steps++;
        _gettimeofday(&job->queue_time, NULL);

        //DINFO("queue job %s, idx %u count %u\n", job->name, ent->id.idx, queue->count);

        __job_unlock(ent);

        queue->array[queue->count] = *handler;
        queue->count++;

        sy_spin_unlock(&queue->lock);

        return 0;
err_lock:
        sy_spin_unlock(&queue->lock);
err_ret:
        return ret;
}

static int __jobdock_worker_queue(const worker_handler_t *handler, job_handler_t *job_handler)
{
        int ret;

        ret = worker_queue(handler, job_handler);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = worker_post(handler);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int jobdock_worker_create(worker_handler_t *handler, const char *name)
{
        int ret;
        jobdock_queue_t *queue;

        ret = __jobdock_worker_queue_init(&queue, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = worker_create(handler, name, __jobdock_worker_exec,
                            __jobdock_worker_queue_func, queue,
                            WORKER_TYPE_SEM, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
